Conversation
| org.apache.beam.sdk.io.Read.Unbounded<byte[]> unbounded = | ||
| org.apache.beam.sdk.io.Read.from(getSource()); | ||
|
|
||
| PTransform<PBegin, PCollection<byte[]>> transform = unbounded; |
There was a problem hiding this comment.
This snippet is repeating in all unbounded sources we have. What do you think about deduplicating it? E.g. making .withMaxNumRecords() special-case Long.MAX_VALUE, and making withMaxReadTime special-case Duration.millis(Long.MAX_VALUE) to do exactly this, so that you can say:
return input.getPipeline().apply(Read.from(getSource()).withMaxNumRecords(maxNumRecords).withMaxReadTime(maxReadTime));
And one more thing: maxNumRecords and maxReadTime are not part of the source - they're not used in the code of the source; they're actually configuration of the transform on top of the source.
| public void populateDisplayData(DisplayData.Builder builder) { | ||
| super.populateDisplayData(builder); | ||
|
|
||
| source.populateDisplayData(builder); |
There was a problem hiding this comment.
Seems like UnboundedMqttSource doesn't implement populateDisplayData.
|
|
||
| } | ||
|
|
||
| private static class UnboundedMqttReader extends UnboundedSource.UnboundedReader |
|
|
||
| private UnboundedMqttSource source; | ||
|
|
||
| private MqttCheckpointMark checkpointMark; |
There was a problem hiding this comment.
This field is effectively unused; the checkpoint mark is passed around but it doesn't do anything, in particular, it does not affect what data is being read; I suppose because the MQTT protocol actually doesn't support checkpointing and resuming. To better express that, why not just use Void as the checkpoint type, and get rid of MqttCheckpointMark.
| public void close() throws IOException { | ||
| try { | ||
| if (client != null) { | ||
| client.disconnect(); |
There was a problem hiding this comment.
Can disconnect() throw an exception? Is it an issue that in that case you don't close the client?
| } | ||
|
|
||
| @Override | ||
| public byte[] getCurrent() { |
There was a problem hiding this comment.
Throw new NSEE if it's unavailable.
| } | ||
|
|
||
| @Override | ||
| public void messageArrived(String topic, MqttMessage message) { |
There was a problem hiding this comment.
What's the thread safety of this? Can it be called from multiple threads?
| private final int qos; | ||
| private final boolean retained; | ||
|
|
||
| private MqttClient client; |
|
I did a mistake on the commit I wanted to push. I'm fixing the issues. Sorry about that. |
f826b4a to
420fa85
Compare
|
Updated. |
|
The Jenkins failure is not related to MQTT (it failed on the WordCount example). |
420fa85 to
f981ea5
Compare
|
Rebased to see if it helps to fix the |
| public Coder getDefaultOutputCoder() { | ||
| return SerializableCoder.of(byte[].class); | ||
| } | ||
|
|
There was a problem hiding this comment.
Clean up blank lines like these too. Generally they are a good idea between members, but not before the first member or after last.
|
|
||
| private final UnboundedMqttSource source; | ||
|
|
||
| private transient MqttClient client; |
There was a problem hiding this comment.
It's unnecessary to make these transient, readers are not serialized.
| } | ||
| }); | ||
| return advance(); | ||
| } catch (Exception e) { |
There was a problem hiding this comment.
What other exception is there to be thrown here, other than IOException? seems like you can remove the try/catch.
There was a problem hiding this comment.
Please address this comment. I suppose you mean MqttException - change this to catch(MqttException).
| KV<MqttMessage, Instant> message = queue.take(); | ||
| current = message.getKey().getPayload(); | ||
| watermark = message.getValue(); | ||
| if (current == null) { |
|
|
||
| @Override | ||
| public boolean advance() throws IOException { | ||
| LOGGER.info("Taking from the pending queue ({})", queue.size()); |
There was a problem hiding this comment.
Logging at INFO once per message is too verbose. Imagine what this will do with tens of thousands of messages per second.
| MqttClient client = new MqttClient(serverUri(), clientId()); | ||
| client.connect(); | ||
| if (topic() != null) { | ||
| client.subscribe(topic()); |
There was a problem hiding this comment.
Does this part make sense for the writer too?
There was a problem hiding this comment.
No, it's only for the Read. I remove this part from here.
| client.connect(); | ||
| if (topic() != null) { | ||
| client.subscribe(topic()); | ||
| } |
There was a problem hiding this comment.
If topic is null, then we don't subscribe at all? What does it mean to have a null topic?
| client.setCallback(new MqttCallback() { | ||
| @Override | ||
| public void connectionLost(Throwable cause) { | ||
| LOGGER.warn("MQTT connection lost", cause); |
There was a problem hiding this comment.
What should the reader do in this case?
|
|
||
| @Override | ||
| public void deliveryComplete(IMqttDeliveryToken token) { | ||
| // nothing to do |
There was a problem hiding this comment.
Can this possibly be called when reading?
There was a problem hiding this comment.
No, only when publishing (when async).
| public boolean advance() throws IOException { | ||
| LOGGER.info("Taking from the pending queue ({})", queue.size()); | ||
| try { | ||
| KV<MqttMessage, Instant> message = queue.take(); |
There was a problem hiding this comment.
Will this block forever if no messages are arriving?
There was a problem hiding this comment.
Yes, according to the javadoc, take() will wait if the blocking queue is empty.
| * }</pre> | ||
| */ | ||
| public class MqttIO { | ||
|
|
There was a problem hiding this comment.
It's not my IDE, it's me actually, I like to have "aerated" code (my feeling ;)).
f981ea5 to
4dfd890
Compare
| return new AutoValue_MqttIO_MqttConnectionConfiguration(serverUri, clientId, topic); | ||
| } | ||
|
|
||
| private void validate() { |
There was a problem hiding this comment.
Isn't this redundant with the validation happening in create()? - create() should not allow to create an invalid object.
There was a problem hiding this comment.
Good point. I remove the validate() method and do the validation only in the create() method.
| return toBuilder().setMqttConnectionConfiguration(configuration).build(); | ||
| } | ||
|
|
||
| public Read withMaxNumRecords(long maxNumRecords) { |
There was a problem hiding this comment.
Either disallow setting both, or specify which takes priority between maxNumRecords and maxReadTime.
There was a problem hiding this comment.
Yes, maxNumRecords and maxReadTime should be exclusive.
| private static class UnboundedMqttSource | ||
| extends UnboundedSource<byte[], UnboundedSource.CheckpointMark> { | ||
|
|
||
| private Read spec; |
| PipelineOptions options) { | ||
| List<UnboundedMqttSource> sources = new ArrayList<>(); | ||
| for (int i = 0; i < desiredNumSplits; i++) { | ||
| // NB: it's important that user understand the impact of MQTT message QoS |
There was a problem hiding this comment.
This comment seems out of place here - at least, I'm not sure how to interpret it. Is it intended to be read by the user? If yes, then it should be somewhere in a prominent place in the javadoc where a user is likely to notice it when getting started with this class.
There was a problem hiding this comment.
Agree, it was more in the tests implementation. It makes sense to remove this comment (useless).
|
|
||
| @Override | ||
| public Coder getDefaultOutputCoder() { | ||
| return SerializableCoder.of(byte[].class); |
There was a problem hiding this comment.
Use ByteArrayCoder instead. SerializableCoder is very inefficient.
| .isEqualTo(10L); | ||
| PAssert.that(output).satisfies(new SerializableFunction<Iterable<byte[]>, Void>() { | ||
|
|
||
| int count = 0; |
| }); | ||
|
|
||
| // produce messages on the broker in another thread | ||
| // This thread prevents to block the pipeline waiting for new messages |
There was a problem hiding this comment.
If you make advance() use a take() with a timeout, will this be unnecessary?
There was a problem hiding this comment.
Yes, it's a different think: it's to have publish and consume in the same time.
| Thread thread = new Thread() { | ||
| public void run() { | ||
| try { | ||
| // gives time to the pipeline to start |
There was a problem hiding this comment.
Explain why do we need to wait for the pipeline to start? Will MQTT drop messages if nobody's subscribed?
In general, I very strongly suggest to avoid timing-dependent tests. It always seems like you can choose a "large enough" delay, but I've again and again found that this is not the case, and environmental flakiness sooner or later causes the test to fail (maybe 1 in 1000 times - but at Beam's scale in terms of number of tests and number of test runs (e.g. pre-commits), pretty soon you have enough tests with this level of flakiness and pretty soon the overall flakiness grows unacceptable) and you have to rewrite the test. Is it possible to avoid this?
There was a problem hiding this comment.
Yes, MQTT broker will delete the messages if there are no subscribers on the topic. That's why I created another thread.
I introduced a startup timeout to give time to the test pipeline to start.
I don't see an easy way to improve that. Let me try changing retained and QoS if I can avoid the client thread.
There was a problem hiding this comment.
I see you changed retained and QoS - can you avoid the client thread now?
There was a problem hiding this comment.
Yes, the purpose of QoS and retained change was to avoid the client thread.
I tried to remove the client thread. Unfortunately, I don't know why yet, but the test seems to be stuck. Debugging didn't help for now. I will try to ping you to investigate together if you don't mind.
By the way, I have a similar code in another project (Apache Karaf Decanter) where it works fine. So, I suspect kind of race condition or thread deadlock with the pipeline and the direct runner.
There was a problem hiding this comment.
Let's investigate together. I'm not comfortable submitting an IO connector where we don't understand why its tests work or don't work.
| .withQoS(2)); | ||
| pipeline.run(); | ||
|
|
||
| Assert.assertEquals(100, messages.size()); |
There was a problem hiding this comment.
Why is this guaranteed to be 100 at this point? Is this thread-safe / race-free? Is it possible that the pipeline already finished, but your receiver has not yet received all the messages?
There was a problem hiding this comment.
It's guarantee thanks to QoS 2. With this QoS, the pipeline write will wait the ack from the receiver.
There was a problem hiding this comment.
Please add a comment about this. This depends on 2 things:
- QoS 2 blocks the writer until a receiver sends an ack,
- MqttClient sends an ack only after messageArrived() has successfully completed.
This was not obvious to me until I read javadocs of MqttClient.
There was a problem hiding this comment.
I added a comment before the pipeline.apply. Does it look good to you ?
There was a problem hiding this comment.
I believe it's not sufficient. I'll wait for your reply to my comments on the documentation of different QoS in the main java file.
There was a problem hiding this comment.
QoS 2 makes no guarantees on when the message will be delivered to the subscriber - it synchronizes the publisher against broker, and synchronized broker against subscriber, but broker is still allowed to buffer messages. By the time you do this assertEquals(), the message may still be buffered on the broker and not yet delivered to the subscriber, so you may get something less than 100 here (including getting 0).
|
|
||
| @Override | ||
| public void messageArrived(String topic, MqttMessage message) throws Exception { | ||
| synchronized (messages) { |
There was a problem hiding this comment.
Nobody else is synchronizing on "messages", so this method is synchronizing against itself, which is pointless because it is invoked sequentially anyway. Which thread safety guarantee is this synchronized block trying to accomplish?
There was a problem hiding this comment.
My bad, the synchronized is not required and actually, it's not a problem as messages list is accessed only by the receiver. Again, thanks to the QoS 2, we have a single access to the messages list.
7f08fbf to
541da93
Compare
jkff
left a comment
There was a problem hiding this comment.
Thanks! I think the semantics is mostly correct now. Most of my remaining comments are cosmetic.
| * | ||
| * <h3>Reading from a MQTT broker</h3> | ||
| * | ||
| * <p>MqttIO source returns an unbounded collection of {@code byte[]} as |
There was a problem hiding this comment.
MqttIO returns an unbounded {@code PCollection<byte[]>} containing message payloads.
| * {@code PCollection<byte[]>}, where {@code byte[]} is the MQTT message payload.</p> | ||
| * | ||
| * <p>To configure a MQTT source, you have to provide a MQTT connection configuration including | ||
| * {@code ClientId}, a {@code ServerURI}, and eventually a {@code Topic} pattern. The following |
There was a problem hiding this comment.
Not sure what "eventually" is supposed to mean here? Did you mean "optionally", "finally", or something else? I think the sentence will make as much sense if you drop the word.
There was a problem hiding this comment.
I removed eventually (I had in mind "optionaly" but the topic is not optional actually).
| * | ||
| * pipeline.apply( | ||
| * MqttIO.read() | ||
| * .withMqttConnectionConfiguration(MqttIO.MqttConnectionConfiguration.create( |
There was a problem hiding this comment.
Please rename this to withConnectionConfiguration to have fewer repetitions of the word "Mqtt" in this line. Same for write.
| * <p>To configure a MQTT sink, as for the read, you have to specify a MQTT connection | ||
| * configuration with {@code ClientId}, {@code ServerURI}, {@code Topic}.</p> | ||
| * | ||
| * <p>Eventually, you can also specify the {@code Retained} and {@code QoS} of the MQTT |
There was a problem hiding this comment.
Again, "eventually" seems out of place here.
There was a problem hiding this comment.
+1, it's actually "optionally"
| public abstract static class Read extends PTransform<PBegin, PCollection<byte[]>> { | ||
|
|
||
| @Nullable abstract MqttConnectionConfiguration mqttConnectionConfiguration(); | ||
| @Nullable abstract long maxNumRecords(); |
There was a problem hiding this comment.
maxNumRecords is not nullable, it's a long. (ideally this would be an error caught by checkstyle)
| * persistence using {@code MqttConnectOptions}. | ||
| * If a persistence mechanism is not specified, the message will not be | ||
| * delivered in the event of a client failure. | ||
| * The message will be acknowledged across the network. |
There was a problem hiding this comment.
What exactly does this mean: does it mean that writing a message will wait for a subscriber to acknowledge it? How long will it wait? What if there's currently no subscriber, or if the subscriber forgets to acknowledge? Do the answers to this depend on whether or not the message is persisted? (I suppose if it's persisted by the broker, then the writer doesn't need to wait for a subscriber ack because it's now the broker's responsibility to ensure it's delivered? But I'm not familiar enough with Mqtt)
There was a problem hiding this comment.
Correct, it's on the broker side. The broker will wait a subscriber ack (depending of the QoS) before removing the message from the persistence store.
| * | ||
| * <li>Quality of Service 2 - indicates that a message should | ||
| * be delivered once. The message will be persisted to disk, and will | ||
| * be subject to a two-phase acknowledgement across the network. |
| Thread thread = new Thread() { | ||
| public void run() { | ||
| try { | ||
| // gives time to the pipeline to start |
There was a problem hiding this comment.
I see you changed retained and QoS - can you avoid the client thread now?
| } | ||
| } | ||
| }; | ||
| thread.start(); |
There was a problem hiding this comment.
You need to also join the thread, so that it doesn't interfere with other test methods.
There was a problem hiding this comment.
What do you mean by "join" ? Using a ExecutorService ?
There was a problem hiding this comment.
I tried to change from thread.start() to thread.join(), and then, I have the "blocked thread" issue. It seems that something in the pipeline or the direct runner impact my thread (maybe a ExecutorService).
There was a problem hiding this comment.
You need to thread.start() to start the thread, and then thread.join() to join it (i.e. wait for it to complete). Sounds like you called join() without calling start(), which deadlocked because the thread wasn't started.
There was a problem hiding this comment.
It's what I did (start and then join), but the thread is blocked.
There was a problem hiding this comment.
I forgot what was the conclusion here?
Here's what I had in mind:
thread.start();
pipeline.run();
thread.join();
So that there's no lingering activity going on after the test method has completed.
| .withQoS(2)); | ||
| pipeline.run(); | ||
|
|
||
| Assert.assertEquals(100, messages.size()); |
There was a problem hiding this comment.
Please add a comment about this. This depends on 2 things:
- QoS 2 blocks the writer until a receiver sends an ack,
- MqttClient sends an ack only after messageArrived() has successfully completed.
This was not obvious to me until I read javadocs of MqttClient.
541da93 to
2e4656d
Compare
| Assert.assertEquals("This is test " + count, inputString); | ||
| count++; | ||
| Assert.assertTrue(inputString.startsWith("This is test ")); | ||
| int count = Integer.parseInt(inputString.substring("This is test ".length())); |
There was a problem hiding this comment.
This test only verifies that we don't get garbage (which seems extremely unlikely - I can not imagine what bug in the MqttIO implementation could lead to receiving garbage in a test like this), but it does not verify that messages aren't dropped/duplicated, which is really the important part. So this test will still pass, e.g., if the code drops all messages.
You need to verify by putting the strings into a HashSet and comparing it against an expected HashSet. Or perhaps even better, apply Flatten.iterables() and verify it using PAssert.contains() against an expected list - it will handle the unordered-ness for you.
There was a problem hiding this comment.
Yes, it makes sense. Let me improve that.
| * | ||
| * <h3>Writing to a MQTT broker</h3> | ||
| * | ||
| * <p>MqttIO sink supports writing {@code byte[]} to a topic on a MQTT broker.</p> |
There was a problem hiding this comment.
</p> is unnecessary everywhere, this HTML tag is self-closing.
There was a problem hiding this comment.
javadoc throws warning if an opening <p> is not closed with </p>.
There was a problem hiding this comment.
That's very surprising to me, since the official Oracle documentation about Javadoc does not use </p> - http://www.oracle.com/technetwork/java/javase/documentation/index-137868.html
There was a problem hiding this comment.
Sorry, the warning is not from javadoc but from checkstyle.
If I use this form (as in the Oracle documentation):
line
<p>
line
it results with:
[ERROR] (javadoc) JavadocParagraph: <p> tag should be placed immediately before the first word, with no space after.
[ERROR] (javadoc) JavadocParagraph: <p> tag should be preceded with an empty line.
So, I have to use at least:
<p>line
<p>line
with an empty line before <p> (to avoid <p> tag should be preceded with an empty line.).
There was a problem hiding this comment.
Anyway, I remove the closing </p> and keep only <p>.
| } | ||
|
|
||
| @Override | ||
| public Coder getCheckpointMarkCoder() { |
There was a problem hiding this comment.
This is a raw type. It needs to be Coder<CheckpointMarkT> where CheckpointMarkT is your UnboundedSource's second type parameter.
There was a problem hiding this comment.
There's no actual checkpoint in the source. I'm using a VoidCoder there. That's why I didn't explicitly defined the type.
There was a problem hiding this comment.
There is a checkpoint (even though you don't use it) - you are returning a non-null value from getCheckpointMark, and this value is being passed to VoidCoder - the only reason this doesn't throw a ClassCastException is that Java's type erasure didn't insert a class cast somewhere on the way, and that VoidCoder.encode ignores the supplied value (e.g. doesn't test that it's actually instanceof Void). At the very least, you'd need to make getCheckpointMark return null.
Then, it would happen to work in practice because you'd always return null for the checkpoint and VoidCoder can encode it too, and it compiles because you effectively disable type-checking by using a raw type - but formally speaking the code is not type-safe: the type of the checkpoint is UnboundedSource.CheckpointMark, but the coder is a Coder<Void>.
So: as a rule of thumb, please never use raw types, period - unless the code really does something that is provably safe but too difficult for the Java type system to express.
In this case, you can make the code type-safe by changing signature of this method to Coder<UnboundedSource.CheckpointMark> and creating a Coder<UnboundedSource.CheckpointMark> that effectively works like VoidCoder - i.e. uses 0 bytes, and can only encode null (and bails on anything else), and always decodes as null. I'm using this in KafkaIO https://github.com/apache/incubator-beam/pull/1048/files#diff-2fa38a7f8d24217f1f7bde0f5c7dbb40R1254
It might make sense to add a class like that to the SDK - e.g. "class NullOnlyCoder", and make VoidCoder derive from NullOnlyCoder<Void>.
| } | ||
|
|
||
| @Override | ||
| public Coder getDefaultOutputCoder() { |
There was a problem hiding this comment.
This is a raw type too. Please consider configuring the IDE to report rawtypes as errors and check if there's anything else I missed.
| public void connectionLost(Throwable cause) { | ||
| LOGGER.warn("MQTT connection lost", cause); | ||
| try { | ||
| close(); |
There was a problem hiding this comment.
(still needs to be addressed)
| * delivered in the event of a client failure.</li> | ||
| * </ul> | ||
| * | ||
| * <p>If persistence is not configured, QoS 1 and 2 messages will still be delivered |
There was a problem hiding this comment.
If persistence is not configured on the broker, you mean?
There was a problem hiding this comment.
Yes, if the broker doesn't support persistence (it's very rare, but can happen).
| * | ||
| * <p>If persistence is not configured, QoS 1 and 2 messages will still be delivered | ||
| * in the event of a network or server problem as the client will hold state in memory. | ||
| * If the MQTT client is shutdown or fails and persistence is not configured then |
There was a problem hiding this comment.
I'd suggest to rephrase this in terms of consequences for someone who uses this transform in a pipeline. The pipeline writer has no control over the MQTT client - it runs inside a bundle that runs inside a VM or container or somewhere else, controlled by the runner and subject to arbitrary failures that the runner transparently handles and retries.
So basically, for a pipeline writer, this means that messages can be duplicated (in case the writer bundle is retried multiple times), but can not be lost: at-least-once delivery.
By the way, this means specifying QoS 2 is meaningless, since we can not provide exactly-once guarantees anyway. In order to get those guarantees, MQTT protocol would need to provide deduplication-by-message-id - does it have a feature like that? If yes, how can the pipeline writer control message ids? (automatically generated ones are not sufficient - see my latest email to beam-dev@ about the Checkpoint transform)
This might be too complicated to address in the current PR, so I'd suggest to just document that this class provides best-effort delivery at QoS 0 and at-least-once delivery at QoS 1, and address exactly-once delivery later.
There was a problem hiding this comment.
I understand your point. Unfortunately, MQTT protocol by itself doesn't provide deduplication.
Let me update the documentation to indicate QoS 0 & 1 are fully supported, not yet QoS 2.
There was a problem hiding this comment.
I would suggest to explicitly prohibit QoS 2 - it's not just "limited", it doesn't work. I.e. throw an exception if a user tries to set QoS 2, because we can't do what they are asking for - and document that it's explicitly not supported, and explain why.
|
|
||
| /** | ||
| * Whether or not the publish message should be retained by the messaging engine. | ||
| * Sending a message with the retained set to {@code false} will clear the |
There was a problem hiding this comment.
I don't understand this: do you mean that if you send a message with retained=true it will be saved, but then if you send the same message with retained=false then it will be removed from the server? How does the server identify "the same" message - do messages have some kind of id?
There was a problem hiding this comment.
When publishing MQTT messages, a publishing client has no guarantee that a message is actually received by a subscribing client. It can only make sure its message gets delivered safely to the broker. The same is true for a subscribing client. If a client is connecting and subscribing to topics it is interested in, there is no guarantee when the subscriber will get the first message, because this totally depends on a publisher on that topic. It can take a few seconds, minutes or hours until the publisher sends a new message on that topic. Until then the subscribing client is totally in the dark about the current status. This is were retained messages come into play.
So retained messages can help newly subscribed clients to get a status update immediately after subscribing to a topic and don’t have to wait until a publishing clients send the next update.
In other words a retained message on a topic is the last known good value, because it doesn’t have to be the last value, but it certainly is the last message with the retained flag set to true.
There was a problem hiding this comment.
OK, I read up a little more about retained messages, and much to my surprise, MQTT in fact stores a single retained message per topic - I thought it has a queue of them. However, http://www.hivemq.com/blog/mqtt-essentials-part-8-retained-messages seems to imply that to clear the topic's retained message, you should publish a retained message with zero-byte payload, rather than a message with retained=false - is it the case?
Also, since PCollections are unordered, it's worth mentioning to the user that there are no guarantees about which particular message in the PCollection will be retained - it can be any one of them.
Also, this is a per-transform flag, rather than per-message flag, so the comment should be adjusted to account for that - if retained=true, then all messages are published as retained, and the topic's retained message will be one of them that's published the latest (and there are no guarantees whatsoever about which of them it will be). If retained = false, then none of the messages are retained.
Actually you may want to change the API to allow specifying QoS and retained flags on individual messages. But I'm not sure.
| } | ||
| } | ||
| }; | ||
| thread.start(); |
| .withQoS(2)); | ||
| pipeline.run(); | ||
|
|
||
| Assert.assertEquals(100, messages.size()); |
There was a problem hiding this comment.
I believe it's not sufficient. I'll wait for your reply to my comments on the documentation of different QoS in the main java file.
|
R: -@dhalperi |
|
Resuming my work on this. Update will follow soon. |
64ae37b to
d54258c
Compare
d54258c to
00bcb95
Compare
|
Rebased, and use free network port to start ActiveMQ instance in the tests. @jkff Can we sync together to see what's pending in the PR ? Thanks ! |
jkff
left a comment
There was a problem hiding this comment.
Thanks, let's go over these comments tomorrow over chat.
| * | ||
| * <h3>Writing to a MQTT broker</h3> | ||
| * | ||
| * <p>MqttIO sink supports writing {@code byte[]} to a topic on a MQTT broker.</p> |
There was a problem hiding this comment.
That's very surprising to me, since the official Oracle documentation about Javadoc does not use </p> - http://www.oracle.com/technetwork/java/javase/documentation/index-137868.html
| * "my_client_id", | ||
| * "my_topic")) | ||
| * .withRetained(true) | ||
| * .withQoS(2) |
There was a problem hiding this comment.
Per discussion in the rest of the PR, setting QoS to 2 doesn't make much sense (or at least doesn't do what the user thinks it does), so the example should not use it.
| public void populateDisplayData(DisplayData.Builder builder) { | ||
| super.populateDisplayData(builder); | ||
| connectionConfiguration().populateDisplayData(builder); | ||
| builder.add(DisplayData.item("maxNumRecords", maxNumRecords())); |
There was a problem hiding this comment.
We should add this only if it's not MAX_VALUE.
| } | ||
|
|
||
| @Override | ||
| public Coder getCheckpointMarkCoder() { |
There was a problem hiding this comment.
There is a checkpoint (even though you don't use it) - you are returning a non-null value from getCheckpointMark, and this value is being passed to VoidCoder - the only reason this doesn't throw a ClassCastException is that Java's type erasure didn't insert a class cast somewhere on the way, and that VoidCoder.encode ignores the supplied value (e.g. doesn't test that it's actually instanceof Void). At the very least, you'd need to make getCheckpointMark return null.
Then, it would happen to work in practice because you'd always return null for the checkpoint and VoidCoder can encode it too, and it compiles because you effectively disable type-checking by using a raw type - but formally speaking the code is not type-safe: the type of the checkpoint is UnboundedSource.CheckpointMark, but the coder is a Coder<Void>.
So: as a rule of thumb, please never use raw types, period - unless the code really does something that is provably safe but too difficult for the Java type system to express.
In this case, you can make the code type-safe by changing signature of this method to Coder<UnboundedSource.CheckpointMark> and creating a Coder<UnboundedSource.CheckpointMark> that effectively works like VoidCoder - i.e. uses 0 bytes, and can only encode null (and bails on anything else), and always decodes as null. I'm using this in KafkaIO https://github.com/apache/incubator-beam/pull/1048/files#diff-2fa38a7f8d24217f1f7bde0f5c7dbb40R1254
It might make sense to add a class like that to the SDK - e.g. "class NullOnlyCoder", and make VoidCoder derive from NullOnlyCoder<Void>.
| * but should only be used for messages which are not valuable - note that | ||
| * if the server cannot process the message (for example, there | ||
| * is an authorization problem), then an | ||
| * {@link MqttCallback#deliveryComplete(IMqttDeliveryToken)} won't be called. |
There was a problem hiding this comment.
This is the documentation that users of MqttIO.Write will see, but this class does not expose MqttCallback - so mentioning it is confusing to clients. It's better to say simply "then the message will be silently dropped".
|
|
||
| /** | ||
| * Whether or not the publish message should be retained by the messaging engine. | ||
| * Sending a message with the retained set to {@code false} will clear the |
There was a problem hiding this comment.
OK, I read up a little more about retained messages, and much to my surprise, MQTT in fact stores a single retained message per topic - I thought it has a queue of them. However, http://www.hivemq.com/blog/mqtt-essentials-part-8-retained-messages seems to imply that to clear the topic's retained message, you should publish a retained message with zero-byte payload, rather than a message with retained=false - is it the case?
Also, since PCollections are unordered, it's worth mentioning to the user that there are no guarantees about which particular message in the PCollection will be retained - it can be any one of them.
Also, this is a per-transform flag, rather than per-message flag, so the comment should be adjusted to account for that - if retained=true, then all messages are published as retained, and the topic's retained message will be one of them that's published the latest (and there are no guarantees whatsoever about which of them it will be). If retained = false, then none of the messages are retained.
Actually you may want to change the API to allow specifying QoS and retained flags on individual messages. But I'm not sure.
| Thread thread = new Thread() { | ||
| public void run() { | ||
| try { | ||
| // gives time to the pipeline to start |
There was a problem hiding this comment.
Let's investigate together. I'm not comfortable submitting an IO connector where we don't understand why its tests work or don't work.
| } | ||
| } | ||
| }; | ||
| thread.start(); |
There was a problem hiding this comment.
You need to thread.start() to start the thread, and then thread.join() to join it (i.e. wait for it to complete). Sounds like you called join() without calling start(), which deadlocked because the thread wasn't started.
| for (int i = 0; i < 10; i++) { | ||
| MqttMessage message = new MqttMessage(); | ||
| message.setQos(1); | ||
| message.setRetained(true); |
There was a problem hiding this comment.
The way I understand it, this will overwrite "the" retained message on the topic 10 times (i.e. the retained message will be message number 9), rather than queue up 10 retained messages. I'm very confused as to how this will interact with the reader thread.
There was a problem hiding this comment.
Yes, the publisher should not use retained in the test.
| .withQoS(2)); | ||
| pipeline.run(); | ||
|
|
||
| Assert.assertEquals(100, messages.size()); |
There was a problem hiding this comment.
QoS 2 makes no guarantees on when the message will be delivered to the subscriber - it synchronizes the publisher against broker, and synchronized broker against subscriber, but broker is still allowed to buffer messages. By the time you do this assertEquals(), the message may still be buffered on the broker and not yet delivered to the subscriber, so you may get something less than 100 here (including getting 0).
…and improve tests.
…e on the reader. Improve tests.
…s and maxReadTime are now exclusive, use ByteArrayCoder instead of SerializableCoder, use poll instead of take on the blocking queue, add javadoc on the with* methods, code cleanup
… QoS (relationship between publisher and subsribers), auto generate clientId (reliability)
… source, fix on the tests
… 30% better performances.
…onnection in write test
f33eaa9 to
2c51524
Compare
|
Refer to this link for build results (access rights to CI server needed): |
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull requestmvn clean verify. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.